package co.recloud.ariadne.thread;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;

import co.recloud.ariadne.config.Configuration;
import co.recloud.ariadne.model.Host;
import co.recloud.ariadne.model.Location;
import co.recloud.ariadne.model.logical.Column;
import co.recloud.ariadne.model.logical.Table;
import co.recloud.ariadne.model.logical.Transaction;
import co.recloud.ariadne.persistor.KeyKeyValuePersistor;
import co.recloud.ariadne.persistor.KeyKeyValuePersistorImpl;
import co.recloud.ariadne.persistor.KeyValuePersistor;
import co.recloud.ariadne.persistor.KeyValuePersistorImpl;
import co.recloud.ariadne.persistor.SchemaPersistor;
import co.recloud.ariadne.persistor.SchemaPersistorImpl;
import co.recloud.ariadne.queue.OutboundRequestQueue;
import co.recloud.ariadne.request.BlockMigrateRequest;
import co.recloud.ariadne.request.Request;
import co.recloud.ariadne.response.BlockMigrateResponse;
import co.recloud.ariadne.store.AddressTable;
import co.recloud.ariadne.store.DataStore;
import co.recloud.ariadne.store.HostTable;

public class PartitionerThread extends Thread {

	public void run() {
		while (true) {
			if(!Main.waitForServiceStatus()) {
				continue;
			}
			if(!Main.waitForIdle()) {
				continue;
			}
			HostTable ht = HostTable.getInstance();
            AddressTable at = AddressTable.getInstance();
			DataStore store = DataStore.getStore(ht.getLocalhost()
					.getLocation());
            Set<String> paths = new HashSet<String>();
            for(int x = 0; x < Configuration.DATA_FILE_WIDTH; x++) {
                for(int y = 0; y < Configuration.DATA_FILE_WIDTH; y++) {
                    Set<String> allPaths = store.getAllPaths(x, y);
                    for(String path : allPaths) {
                        if(at.isLocalPath(path) && at.isPathRanked(path)) {
                            paths.add(path);
                        }
                    }
                }
            }
			Map<String, Map<String, Table>> columnToParentTable = new HashMap<String, Map<String, Table>>();
			Map<String, Set<Column>> tableToChildColumns = new HashMap<String, Set<Column>>();
            Map<Integer, Map<Integer, Integer>> blockScores = new HashMap<Integer, Map<Integer, Integer>>();
			inferSchema(paths, columnToParentTable, tableToChildColumns);
			processRelatedPaths(paths, columnToParentTable, tableToChildColumns, blockScores);
            processBlocks(blockScores);
		}
	}
	
	private void processRelatedPaths(Set<String> paths, Map<String, Map<String, Table>> columnToParentTable, Map<String, Set<Column>> tableToChildColumns, 
                                     Map<Integer, Map<Integer, Integer>> blockScores ) {
		Transaction transaction = new Transaction();
		transaction.generate();
		transaction.setStartTime(Main.getTime());
        AddressTable at = AddressTable.getInstance();
		for(String path : paths) {
			String schema;
			String columnFamily;
			String key;
			String[] keyTokens = path.split(":", 2);
			String[] schemaTokens = keyTokens[0].split("\\.", 2);
			key = keyTokens[1];
			columnFamily = schemaTokens[1];
			schema = schemaTokens[0];
			if(schema.equals("key_values")) {
				KeyValuePersistor kv = new KeyValuePersistorImpl(
						transaction);
				KeyKeyValuePersistor kkv = new KeyKeyValuePersistorImpl(transaction);
				Table currentTable = new Table();
				currentTable.setName(columnFamily);
				Map<String, Object> row = kv.getAllColumns(key, currentTable);
				if(row != null) {
					for(String column : row.keySet()) {
						if(columnToParentTable.containsKey(columnFamily) && columnToParentTable.get(columnFamily).containsKey(column)) {
							Object value = row.get(column);
							if(value != null) {
								Table parentTable = columnToParentTable.get(columnFamily).get(column);
								if(parentTable != null) {
									String relatedPath = schema + "." + parentTable.getName() + ":" + value;
                                    if(at.isPathRanked(relatedPath)) {
                                        updateBlockScores(blockScores, path, relatedPath);
                                    }
								}
							}
						}
					}
					if(tableToChildColumns.containsKey(columnFamily) && tableToChildColumns.get(columnFamily) != null) {
						for(Column childColumn : tableToChildColumns.get(columnFamily)) {
							Set<String> childKeys = kkv.getParentToChild(new Column(currentTable, Table.PRIMARY_KEY), key, childColumn);
							for(String childKey : childKeys) {
								String relatedPath = schema + "." + childColumn.getTable().getName() + ":" + childKey;
                                if(at.isPathRanked(relatedPath)) {
                                    updateBlockScores(blockScores, path, relatedPath);
                                }
							}
						}
					}
				}
                try {
                    this.wait(1000);
                } catch (Exception e){

                }
			}
		}
		transaction.setCommitTime(Main.getTime());
		transaction.commit();
	}

    private void updateBlockScores(Map<Integer, Map<Integer, Integer>> blockScores, String path, String relatedPath) {
        AddressTable at = AddressTable.getInstance();
        int pathHash = at.translatePath(path);
        int relatedPathHash = at.translatePath(path);
        if(!blockScores.containsKey(pathHash)) {
            blockScores.put(pathHash, new HashMap<Integer, Integer>());
        }
        int score = 0;
        if(blockScores.get(pathHash).containsKey(relatedPathHash)) {
            score = blockScores.get(pathHash).get(relatedPathHash);
        }
        blockScores.get(pathHash).put(relatedPathHash, score + 1);
    }
    
	private void processBlocks(Map<Integer, Map<Integer, Integer>> blockScores) {
		HostTable ht = HostTable.getInstance();
		AddressTable at = AddressTable.getInstance();
		Location myLocation = ht.getLocalhost().getLocation();
        DataStore store = DataStore.getStore(myLocation);
        BlockingQueue<Request> queue = OutboundRequestQueue.getQueue();
        for (int start : blockScores.keySet()) {
            Map<Host, Integer> locations = new HashMap<Host, Integer>();
            for (int target : blockScores.get(start).keySet()) {
                Host targetHost = ht.findByHash(new Long(at.translateBlock(target)));
                int score = blockScores.get(start).get(target);
                if(!locations.containsKey(targetHost)) {
                    locations.put(targetHost, score);
                } else {
                    locations.put(targetHost, locations.get(targetHost) + score);
                }
            }
            Host champion = null, second = null;
            int championScore = 0, secondScore = 0;
            for (Host host : locations.keySet()) {
                int score = locations.get(host);
                if (champion == null || score > championScore) {
                    champion = host;
                    championScore = score;
                } else if (second == null || score > secondScore) {
                    second = host;
                    secondScore = score;
                }
            }
            if(championScore - secondScore > Configuration.THRESHOLD && !champion.equals(ht.getLocalhost())) {
                BlockMigrateRequest req = new BlockMigrateRequest();
                req.setBlock(store.getBlock(start));
                req.setAddress(start);
                req.setPrimary(champion);
                req.sendBlocking(champion);
                BlockMigrateResponse response = (BlockMigrateResponse) req.getResponse();
                if(response != null) {
                    if(response.isBlockAccepted()) {
                        int oldAddress = at.translateBlock(start);
                        at.remap(start, response.getNewAddress());
                        store.freeBlock(oldAddress);
                        System.out.println("Block " + oldAddress + " remapped");
                    }
                }
            }
        }
	}

	private void inferSchema(Set<String> paths, Map<String, Map<String, Table>> columnToParentTable, Map<String, Set<Column>> tableToChildColumns) {
		for (String path : paths) {
			String schema;
			String columnFamily;
			String key;
			String[] keyTokens = path.split(":", 2);
			String[] schemaTokens = keyTokens[0].split("\\.", 2);
			key = keyTokens[1];
			columnFamily = schemaTokens[1];
			schema = schemaTokens[0];
			if (schema.equals("key_values")) {
				Transaction transaction = new Transaction();
				transaction.generate();
				transaction.setStartTime(Main.getTime());
				KeyValuePersistor kv = new KeyValuePersistorImpl(
						transaction);
				SchemaPersistor schemaPersistor = new SchemaPersistorImpl(
						transaction);
				if (!columnToParentTable.containsKey(columnFamily)) {
					columnToParentTable.put(columnFamily,
							new HashMap<String, Table>());
				}
				Table currentTable = new Table();
				currentTable.setName(columnFamily);
				Map<String, Object> row = kv.getAllColumns(key, currentTable);
				if (row != null) {
					for (String column : row.keySet()) {
						Column currentColumn = new Column(currentTable,
								column);
						Table parentTable = null;
						if (!columnToParentTable.get(columnFamily)
								.containsKey(column)) {
							parentTable = schemaPersistor
									.getParentTable(currentColumn);
							if (parentTable != null) {
								columnToParentTable.get(columnFamily).put(
										column, parentTable);
							} else {
								columnToParentTable.get(columnFamily).put(
										column, null);
							}
						}
						if(parentTable != null) {
							if(!tableToChildColumns.containsKey(parentTable.getName())) {
								tableToChildColumns.put(parentTable.getName(), new HashSet<Column>());
							}
							tableToChildColumns.get(parentTable.getName()).add(currentColumn);
						}
					}
				}
				transaction.setCommitTime(Main.getTime());
				transaction.commit();
			}
		}
	}
}
